/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.vector.accessor.writer;

import java.util.ArrayList;
import java.util.List;

import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.ColumnReader;
import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.DictWriter;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.drill.exec.vector.accessor.VariantWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.reader.MapReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Implementation for a writer for a tuple (a row or a map.) Provides access to each
 * column using either a name or a numeric index.
 * <p>
 * A tuple maintains an internal state needed to handle dynamic column additions.
 * The state identifies the amount of "catch up" needed to get the new column into
 * the same state as the existing columns. The state is also handy for understanding
 * the tuple lifecycle. This lifecycle works for all three cases of:
 * <ul>
 * <li>Top-level tuple (row).</li>
 * <li>Nested tuple (map).</li>
 * <li>Array of tuples (repeated map).</li>
 * </ul>
 *
 * Specifically, the transitions, for batch, row and array events, are:
 *
 * <table border=1>
 * <tr><th>Public API</th><th>Tuple Event</th><th>State Transition</th>
 *     <th>Child Event</th></tr>
 * <tr><td>(Start state)</td>
 *     <td>&mdash;</td>
 *     <td>IDLE</td>
 *     <td>&mdash;</td></tr>
 * <tr><td>startBatch()</td>
 *     <td>startWrite()</td>
 *     <td>IDLE &rarr; IN_WRITE</td>
 *     <td>startWrite()</td></tr>
 * <tr><td>start() (new row)</td>
 *     <td>startRow()</td>
 *     <td>IN_WRITE &rarr; IN_ROW</td>
 *     <td>startRow()</td></tr>
 * <tr><td>start() (without save)</td>
 *     <td>restartRow()</td>
 *     <td>IN_ROW &rarr; IN_ROW</td>
 *     <td>restartRow()</td></tr>
 * <tr><td>save() (array)</td>
 *     <td>saveValue()</td>
 *     <td>IN_ROW &rarr; IN_ROW</td>
 *     <td>saveValue()</td></tr>
 * <tr><td rowspan=2>save() (row)</td>
 *     <td>saveValue()</td>
 *     <td>IN_ROW &rarr; IN_ROW</td>
 *     <td>saveValue()</td></tr>
 * <tr><td>saveRow()</td>
 *     <td>IN_ROW &rarr; IN_WRITE</td>
 *     <td>saveRow()</td></tr>
 * <tr><td rowspan=2>end batch</td>
 *     <td>&mdash;</td>
 *     <td>IN_ROW &rarr; IDLE</td>
 *     <td>endWrite()</td></tr>
 * <tr><td>&mdash;</td>
 *     <td>IN_WRITE &rarr; IDLE</td>
 *     <td>endWrite()</td></tr>
 * </table>
 *
 * Notes:
 * <ul>
 * <li>For the top-level tuple, a special case occurs with ending a batch. (The
 *     method for doing so differs depending on implementation.) If a row is active,
 *     then that row's values are discarded. Then, the batch is ended.</li>
 * </ul>
 */
public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {

  /**
   * Generic object wrapper for the tuple writer.
   */
  public static class TupleObjectWriter extends AbstractObjectWriter {

    protected final AbstractTupleWriter tupleWriter;

    public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
      this.tupleWriter = tupleWriter;
    }

    @Override
    public ColumnWriter writer() { return tupleWriter; }

    @Override
    public TupleWriter tuple() { return tupleWriter; }

    @Override
    public WriterEvents events() { return tupleWriter; }

    @Override
    public void dump(HierarchicalFormatter format) {
      format
        .startObject(this)
        .attribute("tupleWriter");
      tupleWriter.dump(format);
      format.endObject();
    }
  }

  /**
   * Listener (callback) to handle requests to add a new column to a tuple (row
   * or map). Implemented and bound by the client code that creates or uses the
   * tuple writer. If no listener is bound, then an attempt to add a column
   * throws an exception.
   */
  public static interface TupleWriterListener {

    ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);

    ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);

    boolean isProjected(String columnName);
  }

  /**
   * Wrap the outer index to avoid incrementing the array index
   * on the call to <tt>nextElement().</tt> The increment
   * is done at the tuple level, not the column level.
   */
  static class MemberWriterIndex implements ColumnWriterIndex {
    private final ColumnWriterIndex baseIndex;

    MemberWriterIndex(ColumnWriterIndex baseIndex) {
      this.baseIndex = baseIndex;
    }

    @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
    @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
    @Override public void nextElement() { }
    @Override public void prevElement() { }
    @Override public void rollover() { }

    @Override public ColumnWriterIndex outerIndex() {
      return baseIndex.outerIndex();
    }

    @Override
    public String toString() {
      return "[" + getClass().getSimpleName() + " baseIndex = " + baseIndex.toString() + "]";
    }
  }

  protected static final Logger logger = LoggerFactory.getLogger(AbstractTupleWriter.class);

  protected final TupleMetadata tupleSchema;
  protected final List<AbstractObjectWriter> writers;
  protected ColumnWriterIndex vectorIndex;
  protected ColumnWriterIndex childIndex;
  protected TupleWriterListener listener;
  protected State state = State.IDLE;

  protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) {
    this.tupleSchema = schema;
    this.writers = writers;
  }

  protected AbstractTupleWriter(TupleMetadata schema) {
    this(schema, new ArrayList<AbstractObjectWriter>());
  }

  @Override
  public ObjectType type() { return ObjectType.TUPLE; }

  protected void bindIndex(ColumnWriterIndex index, ColumnWriterIndex childIndex) {
    vectorIndex = index;
    this.childIndex = childIndex;

    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().bindIndex(childIndex);
    }
  }

  @Override
  public void bindIndex(ColumnWriterIndex index) {
    bindIndex(index, index);
  }

  @Override
  public int rowStartIndex() {
    return vectorIndex.rowStartIndex();
  }

  /**
   * Add a column writer to an existing tuple writer. Used for implementations
   * that support "live" schema evolution: column discovery while writing.
   * The corresponding metadata must already have been added to the schema.
   *
   * @param colWriter the column writer to add
   */
  public int addColumnWriter(AbstractObjectWriter colWriter) {
    assert writers.size() == tupleSchema.size();
    final int colIndex = tupleSchema.addColumn(colWriter.schema());
    writers.add(colWriter);
    colWriter.events().bindIndex(childIndex);
    if (state != State.IDLE) {
      colWriter.events().startWrite();
      if (state == State.IN_ROW) {
        colWriter.events().startRow();
      }
    }
    return colIndex;
  }

  @Override
  public boolean isProjected(String columnName) {
    return listener == null ? true
        : listener.isProjected(columnName);
  }

  @Override
  public int addColumn(ColumnMetadata column) {
    verifyAddColumn(column.name());
    return addColumnWriter(
        (AbstractObjectWriter) listener.addColumn(this, column));
  }

  @Override
  public int addColumn(MaterializedField field) {
    verifyAddColumn(field.getName());
    return addColumnWriter(
        (AbstractObjectWriter) listener.addColumn(this, field));
  }

  private void verifyAddColumn(String colName) {
    if (listener == null) {
      throw new UnsupportedOperationException("addColumn");
    }

    if (tupleSchema().column(colName) != null) {
      throw UserException
        .validationError()
        .message("Duplicate column name: %s", colName)
        .build(logger);
    }
  }

  @Override
  public TupleMetadata tupleSchema() { return tupleSchema; }

  @Override
  public int size() { return tupleSchema().size(); }

  @Override
  public boolean nullable() { return false; }

  @Override
  public void setNull() {
    throw new IllegalStateException("Not nullable");
  }

  @Override
  public void startWrite() {
    assert state == State.IDLE;
    state = State.IN_WRITE;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().startWrite();
    }
  }

  @Override
  public void startRow() {
    // Must be in a write. Can start a row only once.
    // To restart, call restartRow() instead.

    assert state == State.IN_WRITE;
    state = State.IN_ROW;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().startRow();
    }
  }

  @Override
  public void endArrayValue() {
    assert state == State.IN_ROW;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().endArrayValue();
    }
  }

  @Override
  public void restartRow() {

    // Rewind is normally called only when a value is active: it resets
    // pointers to allow rewriting the value. However, if this tuple
    // is nested in an array, then the array entry could have been
    // saved (state here is IN_WRITE), but the row as a whole has
    // not been saved. Thus, we must also allow a rewind() while in
    // the IN_WRITE state to set the pointers back to the start of
    // the current row.

    assert state == State.IN_ROW;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().restartRow();
    }
  }

  @Override
  public void saveRow() {
    assert state == State.IN_ROW;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().saveRow();
    }
    state = State.IN_WRITE;
  }

  @Override
  public void preRollover() {

    // Rollover can only happen while a row is in progress.

    assert state == State.IN_ROW;
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).events().preRollover();
    }
  }

  @Override
  public void postRollover() {

    // Rollover can only happen while a row is in progress.

    assert state == State.IN_ROW;
    for (AbstractObjectWriter writer : writers) {
      writer.events().postRollover();
    }
  }

  @Override
  public void endWrite() {
    assert state != State.IDLE;
    for (AbstractObjectWriter writer : writers) {
      writer.events().endWrite();
    }
    state = State.IDLE;
  }

  @Override
  public void copy(ColumnReader from) {
    MapReader source = (MapReader) from;
    // Assumes a 1:1 correspondence between source and
    // destination tuples. That is, does not handle the
    // case of projection: more columns on one side vs.
    // the other. That must be handled outside this class.
    for (int i = 0; i < writers.size(); i++) {
      writers.get(i).writer().copy(source.column(i).reader());
    }
  }

  @Override
  public ObjectWriter column(int colIndex) {
    return writers.get(colIndex);
  }

  @Override
  public ObjectWriter column(String colName) {
    final int index = tupleSchema.index(colName);
    if (index == -1) {
      throw new UndefinedColumnException(colName);
    }
    return writers.get(index);
  }

  @Override
  public void set(int colIndex, Object value) {
    column(colIndex).setObject(value);
  }

  @Override
  public void setObject(Object value) {
    final Object[] values = (Object[]) value;
    if (values.length != tupleSchema.size()) {
      if (schema() == null) {
        throw new IllegalArgumentException(
            String.format("Row has %d columns, but value array has " +
                " %d values.",
                tupleSchema.size(), values.length));
      } else {
        throw new IllegalArgumentException(
            String.format("Map %s has %d columns, but value array has " +
                " %d values.",
                schema().name(),
                tupleSchema.size(), values.length));
      }
    }
    for (int i = 0; i < values.length; i++) {
      set(i, values[i]);
    }
  }

  @Override
  public ScalarWriter scalar(int colIndex) {
    return column(colIndex).scalar();
  }

  @Override
  public ScalarWriter scalar(String colName) {
    return column(colName).scalar();
  }

  @Override
  public TupleWriter tuple(int colIndex) {
    return column(colIndex).tuple();
  }

  @Override
  public TupleWriter tuple(String colName) {
    return column(colName).tuple();
  }

  @Override
  public ArrayWriter array(int colIndex) {
    return column(colIndex).array();
  }

  @Override
  public ArrayWriter array(String colName) {
    return column(colName).array();
  }

  @Override
  public VariantWriter variant(int colIndex) {
    return column(colIndex).variant();
  }

  @Override
  public VariantWriter variant(String colName) {
    return column(colName).variant();
  }

  @Override
  public DictWriter dict(int colIndex) {
    return column(colIndex).dict();
  }

  @Override
  public DictWriter dict(String colName) {
    return column(colName).dict();
  }

  @Override
  public ObjectType type(int colIndex) {
    return column(colIndex).type();
  }

  @Override
  public ObjectType type(String colName) {
    return column(colName).type();
  }

  @Override
  public boolean isProjected() { return true; }

  @Override
  public int lastWriteIndex() {
    return vectorIndex.vectorIndex();
  }

  @Override
  public int writeIndex() {
    return vectorIndex.vectorIndex();
  }

  public void bindListener(TupleWriterListener listener) {
    this.listener = listener;
  }

  public TupleWriterListener listener() { return listener; }

  @Override
  public void bindListener(ColumnWriterListener listener) { }

  @Override
  public void dump(HierarchicalFormatter format) {
    format
      .startObject(this)
      .attribute("vectorIndex", vectorIndex)
      .attribute("state", state)
      .attributeArray("writers");
    for (int i = 0; i < writers.size(); i++) {
      format.element(i);
      writers.get(i).dump(format);
    }
    format
      .endArray()
      .endObject();
  }
}
